RxJava 学习笔记
观察者、被观察者
Observable、Observer
1 | // 1. 创建上游(被观察者) |
ObservableEmitter
- ObservableEmitter:可理解为发射器,可发射3种类型的事件,即调用onNext()、onComplete()、onError() 方法。
- 上游可以发送多个 onNext() 和 onComplete(),只能发送一个 onError();可以不发送 onComplete() 和 onError()。
- 上游发送 onComplete() 或者 onError() 后会继续发送其他事件,但是下游接收到 onComplete() 或者 onError() 事件之后不再接收其他事件。
- 上游可以先发送 onError() 再发送 onComplete(),不能先发送 onComplete() 再发送 onError()。
Disposable
- 调用 dispose() 方法可以切断上下游之间的连接,上游可以继续发送除 onError() 之外的事件,但是下游不再接收事件。
- 可通过两种方式获取 Disposable 对象,分别为:
- 在 onSubscribe() 回调方法的参数中获取;
- 某些重载的订阅方法 subscribe() 返回值是 Disposable 对象:
1
2
3
4
5
6public final Disposable subscribe() {}
public final Disposable subscribe(Consumer<? super T> onNext) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
public final void subscribe(Observer<? super T> observer) {}
Flowable、Subscriber
1 | // 1. 创建上游 |
BackpressureStrategy
- 表示背压策略,有以下几种:
- MISSING:onNext() 事件没有任何缓存和丢弃,下游要处理溢出
- ERROR:缓存区默认为 128,当上游发送事件的速度太快而下游处理不过来时会抛出
MissingBackpressureException
- BUFFER:缓存区大小无限制,使用不当会 OOM
- DROP:缓存最近的 onNext() 事件
- LATEST:缓存区会保留最后一个 onNext() 事件
Subscription
- 和 Disposable 类似,调用 cancel() 表示请求停止发送事件,可切断上下游的连接
- 必须显示调用 request(long n),表示下游可以处理 n 个事件
FlowableEmitter
- 继承自 Emitter,即 onNext()、onError()、onComplete()
- requested():表示下游的处理能力,即下游 s.request() 的大小
- isCancelled():下游是否请求停止发送,即 s.cancel()
Single、SingleObserver
- 上游可以发送多个事件,但是下游只能接收到一个事件
- onSuccess() 和 onError() 是互斥
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39// 1. 创建上游
Single<Integer> single = Single.create(new SingleOnSubscribe<Integer>() {
public void subscribe(SingleEmitter<Integer> e) throws Exception {
for (int i = 0; i < 3; i++) {
e.onSuccess(i);
Log.d(TAG, "subscribe: " + i);
}
}
});
// 2. 创建下游
SingleObserver<Integer> observer = new SingleObserver<Integer>() {
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
public void onSuccess(Integer integer) {
// 相当于 onNext() 和 onComplete()
Log.d(TAG, "onSuccess: " + integer);
}
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
};
// 3. 建立连接(订阅)
single.subscribe(observer);
// 4. 打印结果:
onSubscribe:
onSuccess: 0
subscribe: 0
subscribe: 1
subscribe: 2
SingleEmitter
- 是个接口,onSuccess()、onError()
Completable、CompletableObserver
1 | // 1. 创建上游 |
- 上游可以发送多个事件,但是下游只能接收到一个事件
- onComplete() 和 onError() 是互斥
Maybe、MaybeObserver
1 | // 1. 创建上游 |
- 类似于 Single 和 Completable 的混合体,onSuccess()、onComplete()、onError() 三者互斥
线程调度
subscribeOn()
- 表示上游发送事件的线程
- 有多个 subscribeOn() 时,上游只会在第一个 subscribeOn() 表示的线程发送事件
observeOn()
- 表示下游接收事件的线程
- 有多个 observeOn() 时,下游只会在最后一个 observeOn() 表示的线程接收事件
Scheduler
Schedulers.immediate()
: 默认的Scheduler
,直接在当前线程运行。Schedulers.newThread()
: 总是启用新线程,并在新线程执行操作。Schedulers.io()
: I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的Scheduler
。行为模式和newThread()
差不多,区别在于io()
的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下io()
比newThread()
更有效率。不要把计算工作放在io()
中,可以避免创建不必要的线程。Schedulers.computation()
: 计算所使用的Scheduler
。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个Scheduler
使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在computation()
中,否则 I/O 操作的等待时间会浪费 CPU。AndroidSchedulers.mainThread()
: Android 专用的,它指定的操作将在 Android 主线程运行。
背压策略
ERROR
- error:当下游没有请求数据时,上游最多只能发送128个事件,多于 128 时将会调用 onError() 抛出 MissingBackpressureException 异常;当上下游流速均衡(即上游发送数据和下游处理数据的速度相同)时,上游可以发送无限数据,不会出现 OOM
- 当下游没有请求数据时,上游用 AtomicLong 记录需要发送给下游的数据个数,默认是 128 个;发送数据是在 ErrorAsyncEmitter 的父类 NoOverflowBaseAsyncEmitter 的
onNext()
方法中完成的:如果 AtomicLong 不等于 0 就发送一个数据并且 AtomicLong 减 1,前 128 个数据都会到达下游的缓存队列中进行缓存;当 AtomicLong 为 0 时不再发送数据到下游的缓存队列,而是调用 ErrorAsyncEmitter 的实现方法onOverflow()
,onOverflow()
方法里面调用onError()
方法抛出 MissingBackpressureException 异常 - 当下游请求数据时(s.request(100)),请求的个数保存在 requested 变量中;请求的数据最终是在 FlowableObserveOn 的子类 ObserveOnSubscriber 的
runAsync()
方法中发送给外界:首先从下游缓存队列中取 100 个数据发送出去(即 onNext() 被调用),发送一个数据就e++
,每当取到第 96 个数据的时候先r = requested -= 96; e = 0L;
,然后再请求上游发送 96 个数据到下游的缓存队列(此时如果上游继续发送数据(e.onNext(i)),由于 AtomicLong 大于 0 会继续发送数据到下游缓存队列,否则就不发送数据到下游),与此同时下游继续从缓存队列取数据发送出去,发送一个数据就e++
,直到while(e != r)
不成立导致不再发送给外界。此时如果外界主动调用s.request(n)
请求数据将继续发送数据给外界 - 上游发送完全部数据之前,如果上游发送过的所有数据比下游请求过的所有数据
>= 96
时抛出 MissingBackpressureException 异常。因为每次下游都是请求 96 个数据,96 保存在上游的 AtomicLong 中,发送一个数据就减 1,当 AtomicLong 为 0 时就抛出 MissingBackpressureException 异常
BUFFER
- buffer: 上游可以发送无限个数据,不会出现 MissingBackpressureException 异常,但是会 OOM
- 当下游没有请求数据时,上游用 AtomicLong 记录需要发送给下游的数据个数,默认是 128 个;发送数据是在 BufferAsyncEmitter 的
onNext()
方法中先用queue.offer(t)
保存发送过的所有数据,然后再调用drain()
方法完成数据发送:每发送一个数据就减 1,前 128 个数据都会到达下游的缓存队列中进行缓存,当 AtomicLong 为 0 时不再发送数据到下游的缓存队列 - 当下游请求数据时(s.request(100)),请求的个数保存在 requested 变量中;请求的数据最终是在 FlowableObserveOn 的子类 ObserveOnSubscriber 的
runAsync()
方法中发送给外界:首先从下游缓存队列中取 100 个数据发送出去(即 onNext() 被调用),发送一个数据就e++
,每当取到第 96 个数据的时候先r = requested -= 96; e = 0L;
,然后再请求上游发送 96 个数据到下游的缓存队列,与此同时继续取 4 个数据发送出去,发送一个数据就e++
,这时发送了 4 个数据后while(e != r)
不成立导致不再发送给外界 - 当上游发送完全部数据下游再请求数据时,最终会到达 BaseEmitter 的
request()
方法设置 AtomicLong 的值为 96,再去调用 BufferAsyncEmitter 实现的onRequested()
方法,onRequested()
中再调用drain()
方法完成数据的发送。drain()
方法会从 queue 中取出未发送过的数据发送给下游的缓存队列,然后下游再从缓存队列中取出数据发送给外界 - 当上游发送完全部数据下游再请求数据时,最终会到达 BaseEmitter 的
request()
方法中去调用空方法onRequested()
,而onNext()
方法不再被调用导致不再发送数据到下游缓存队列,仅仅是从下游缓存队列中取出数据发送给外界
DROP
- drop: 上游可发送无限个数据
- 当下游没有请求数据时,上游用 AtomicLong 记录需要发送给下游的数据个数,默认是 128 个;发送数据是在 DropAsyncEmitter 的父类 NoOverflowBaseAsyncEmitter 的
onNext()
方法中完成的:如果 AtomicLong 不等于 0 就发送一个数据并且 AtomicLong 减 1,前 128 个数据都会到达下游的缓存队列中进行缓存;当 AtomicLong 为 0 时不再发送数据到下游的缓存队列,而是调用onOverflow()
,onOverflow()
是个空方法,也就是丢弃数据 - 当下游请求数据时(s.request(100)),请求的个数保存在 requested 变量中;请求的数据最终是在 FlowableObserveOn 的子类 ObserveOnSubscriber 的
runAsync()
方法中发送给外界:首先从下游缓存队列中取 100 个数据发送出去(即 onNext() 被调用),发送一个数据就e++
,每当取到第 96 个数据的时候先r = requested -= 96; e = 0L;
,然后再请求上游发送 96 个数据到下游的缓存队列,与此同时继续取 4 个数据发送出去,发送一个数据就e++
,这时发送了 4 个数据后while(e != r)
不成立导致不再发送给外界 - 当上游发送完全部数据下游再请求数据时,最终会到达 BaseEmitter 的
request()
方法中去调用空方法onRequested()
,而onNext()
方法不再被调用导致不再发送数据到下游缓存队列,仅仅是从下游缓存队列中取出数据发送给外界
LATEST
- latest: 上游可以发送无限个数据
- 当下游没有请求数据时,上游用 AtomicLong 记录需要发送给下游的数据个数,默认是 128 个;发送数据是在 LatestAsyncEmitter 的
onNext()
方法中先用 queue(AtomicReference对象) 保存当前发送的数据,所以发送完所有数据后 queue 保存的是最后一个数据,然后再调用drain()
方法完成数据发送:每发送一个数据就减 1,前 128 个数据都会到达下游的缓存队列中进行缓存,当 AtomicLong 为 0 时不再发送数据到下游的缓存队列 - 当下游请求数据时(s.request(100)),请求的个数保存在 requested 变量中;请求的数据最终是在 FlowableObserveOn 的子类 ObserveOnSubscriber 的
runAsync()
方法中发送给外界:首先从下游缓存队列中取 100 个数据发送出去(即 onNext() 被调用),发送一个数据就e++
,每当取到第 96 个数据的时候先r = requested -= 96; e = 0L;
,然后再请求上游发送 96 个数据到下游的缓存队列,与此同时继续取 4 个数据发送出去,发送一个数据就e++
,这时发送了 4 个数据后while(e != r)
不成立导致不再发送给外界 - 当上游发送完全部数据下游再请求数据时,最终会到达 BaseEmitter 的
request()
方法中去调用 LatestAsyncEmitter 实现的onRequested()
方法,onRequested()
中再调用drain()
方法完成数据的发送。drain()
方法会把 queue 中保存的最后一个数据发送给下游的缓存队列,然后下游再从缓存队列中取出数据发送给外界,所以 LATEST 策略总是可以请求到上游的最后一个数据
MISSING
- missing: 上游没有背压策略,需要下游通过背压操作符(
onBackpressureBuffer()
、onBackpressureDrop()
、onBackpressureLatest()
)来指定背压策略 - 当下游没有指定背压策略时会抛出 MissingBackpressureException 异常
创建操作符
just
- 最多只能发送 10 个数据,最后发送 onComplete
- 当发送数据超过 2 个时,内部调用 fromArray()
1 | Observable.just(1, 2).subscribe(new Observer<Integer>() { |
fromArray
1 | fromArray(T... items) |
- 如果参数是个空数组,直接调用 empty() 创建符;如果只有一个元素,则调用 just()
1
2
3
4
5
6
7
8
9
10public static <T> Observable<T> fromArray(T... items) {
ObjectHelper.requireNonNull(items, "items is null");
if (items.length == 0) {
return empty();
} else
if (items.length == 1) {
return just(items[0]);
}
return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}
1 | int[] ints = new int[]{1, 2}; |
empty
- 只发送 onComplete
1 | Observable.empty().subscribe(new Observer<Object>() { |
fromIterable
1 | List<Integer> list = new ArrayList<>(); |
timer
1 | timer(long delay, TimeUnit unit) |
- 延迟 delay 发送一个 0 和 onComplete
- 默认在子线程发送事件,可指定发送事件所在的线程
1 | Observable.timer(1, TimeUnit.SECONDS).subscribe(new Observer<Long>() { |
interval
1 | interval(long period, TimeUnit unit) // 每隔 period 发送一次 onNext() 事件 |
- 默认在子线程发送事件,可通过参数指定发送事件所在的线程
- 从 0 开始无限制的发送 onNext 事件
1 | Observable.interval(1, TimeUnit.SECONDS).subscribe(new Observer<Long>() { |
intervalRange
1 | intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit) |
- 一开始延迟 initialDelay,然后从 start 开始每隔 period 发送一次 onNext,一共发送 count 个事件,可指定发送事件所在的线程
- 默认是在子线程发送事件,发送了 count 个 onNext 后会发送 onComplete
1 | Observable.intervalRange(3, 3, 0, 1, TimeUnit.SECONDS).subscribe(new Observer<Long>() { |
range、rangeLong
1 | range(final int start, final int count) |
- 从 start 开始发送 count 个 onNext,最后发送 onComplete
1 | Observable.range(1, 3).subscribe(new Observer<Integer>() { |
zip
- zip:将多个 Observable 发送的事件组合起来,然后再发送这个新的事件
- 严格按照发送事件的顺序来组合新的事件
- 下游收到的事件数量和上游发送最少的事件相同,即 observable1 发送 1 个事件,observable2 发送 2 个事件,下游会收到 1 个事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34// observable1
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(i);
}
}).subscribeOn(Schedulers.io());
// observable2
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A");
}
}).subscribeOn(Schedulers.io());
// zip
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
}, new Consumer<Throwable>() {
public void accept(Throwable throwable) throws Exception {
Log.w(TAG, throwable);
}
});
sample
- sample:每隔一定的时间从上游取一个事件发送给下游
1
2// 每隔 2 秒从上游取出一个事件发送给下游
Observable.create(...).sample(2, TimeUnit.SECONDS)
filter
- filter:过滤事件,符合条件的才发送到下游
1
2
3
4
5
6
7Observable.create(...).filter(new Predicate<Object>() {
public boolean test(Object o) throws Exception {
// 返回 true 才继续往下走
return ...;
}
})
take
1 | take(long count) // 发送 count 个事件给下游 |
1 | // 发送 3 个事件 |